# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Description of an iterative method (iteration, stopping criteria, ...)"""
from hysop import Simulation, Problem
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.tools.contexts import Timer
from hysop import dprint, vprint
from hysop.tools.decorators import debug, profile
from hysop.core.graph.graph import ready
from hysop.constants import HYSOP_REAL, HYSOP_INTEGER
from hysop.tools.numpywrappers import npw
from hysop.core.mpi import main_rank, main_size, main_comm
import numpy as np
import datetime
[docs]
class PseudoSimulation(Simulation):
"""Pseudo time-iterations for iterative method"""
def __init__(self, stop_criteria, tolerance=1e-8, **kwds):
"""
Parameters
----------
stop_criteria : TensorParameter
Iterative loop test for stopping is stop_criteria<tolerance
tolerance : float (optional)
Tolerance for stopping criteria (default to 1e-8)
Notes
-----
This object implement an iterative method in a pseudo-time
inside a 'real' time interval. It can be used as sub-iterations
for a fixed-point operator.
It implements a loop defined as follows:
.. code::
while(t<end and iteration<max_iter and max(stop_criteria)>tolerance)
"""
super().__init__(**kwds)
self.stop_criteria, self.tolerance = stop_criteria, tolerance
[docs]
def advance(self, dbg=None, plot_freq=10):
"""Proceed to next time.
* Advance time and iteration number.
* Compute the new timestep
* check if simulation is over.
"""
super().advance(dbg=dbg, plot_freq=plot_freq)
if np.max(self.stop_criteria.value) <= self.tolerance:
self.is_over = True
return
[docs]
def print_state(self, verbose=None):
"""Print current iteration parameters"""
if isinstance(self.stop_criteria, ScalarParameter):
crit = f"{self.stop_criteria.value:.8g}"
else:
crit = np.array2string(
self.stop_criteria.value, formatter={"float_kind": lambda x: f"{x:.8g}"}
)
msg = "=== PseudoSimulation : {0:6d}, criteria = {1} =="
if verbose:
print(msg.format(self.current_iteration, crit))
else:
vprint(msg.format(self.current_iteration, crit))
[docs]
class IterativeMethod(Problem):
"""Overriding a Problem to enfoce a PseudoSimulation for iterative method loop.
The PseudoSimulation is created on each 'apply'. There is no meaning for the
underlying pseudo-time.
Notes
-----
Sub-timestepping is not a usual use-case for this Problem. One should
override Problem class in a proper way. Here only a pseudo-timestep is
used together with a maximal number of iteration to compute a pseudo-final time.
"""
def __new__(
cls,
stop_criteria,
tolerance=1e-8,
state_print=100,
max_iter=10000,
dt0=None,
dt=None,
configsimu=None,
**kwargs,
):
return super().__new__(cls, **kwargs)
def __init__(
self,
stop_criteria,
tolerance=1e-8,
state_print=100,
max_iter=10000,
dt0=None,
dt=None,
configsimu=None,
**kwargs,
):
super().__init__(**kwargs)
self.stop_criteria, self.tolerance = stop_criteria, tolerance
# create a pseudo-time step parameter if not given.
if dt is None:
dt = ScalarParameter(
name="pseudo-dt",
dtype=HYSOP_REAL,
min_value=np.finfo(HYSOP_REAL).eps,
initial_value=np.finfo(HYSOP_REAL).eps,
quiet=True,
)
else:
dt.value = np.finfo(HYSOP_REAL).eps
self.dt0, self.dt = dt0, dt
self.state_print = state_print
self.max_iter = max_iter
# Store the iteration number of each call to 'apply' in a parameter.
self.it_num = ScalarParameter(
name="iterations", dtype=HYSOP_INTEGER, initial_value=0, quiet=True
)
# stop_criteria reset value
self._stop_criteria_reset = npw.ones(
self.stop_criteria.shape, dtype=self.stop_criteria.dtype
)
self._stop_criteria_reset[...] = 1e10
self.configsimu = self._default_configsimu
if not configsimu is None:
self.configsimu = configsimu
def _default_configsimu(self, l, s):
pass
[docs]
@debug
@ready
def apply(self, simulation, report_freq=0, dbg=None, **kwds):
if self.to_be_skipped(self, simulation=simulation, **kwds):
return
self.run_iterations(
simulation=simulation, report_freq=report_freq, dbg=dbg, **kwds
)
[docs]
@profile
def run_iterations(self, simulation, report_freq=0, dbg=None, **kwds):
"""This function si meant to clarify the profiling data"""
vprint("=== Entering iterative method...")
self.stop_criteria.value = self._stop_criteria_reset
# Initialize the pseudo-simulation for iterative loop
# end time is computed from pseudo-time step and a maximal number of iterations.
loop = PseudoSimulation(
stop_criteria=self.stop_criteria,
tolerance=self.tolerance,
start=simulation.t(),
end=simulation.t() + self.max_iter * self.dt0,
max_iter=self.max_iter,
dt0=self.dt0,
dt=self.dt,
t=ScalarParameter(name="dummy-t", dtype=self.dt.dtype, quiet=True),
mpi_params=self.mpi_params,
)
self.configsimu(loop, simulation)
# Usual initialize-loop-finalize sequence :
if not loop._is_ready:
loop.initialize()
with Timer() as tm:
while not loop.is_over:
if loop.current_iteration % self.state_print == 0:
loop.print_state()
super().apply(simulation=loop, dbg=dbg, **kwds)
loop.advance(dbg=dbg)
if report_freq > 0 and (loop.current_iteration % report_freq) == 0:
self.profiler_report()
avg_time = self.mpi_params.comm.allreduce(tm.interval) / self.mpi_params.size
msg = "=== Leaving iterative method ({0} iterations, criteria = {1} in {2} ({3}s) {4})"
vprint(
msg.format(
loop.current_iteration,
np.array2string(
loop.stop_criteria.value,
formatter={"float_kind": lambda x: f"{x:8.8f}"},
),
datetime.timedelta(seconds=round(avg_time)),
avg_time,
(
""
if self.mpi_params.size == 1
else f", averaged over {self.mpi_params.size} ranks. "
),
)
)
self.it_num.value = loop.current_iteration
loop.finalize()
self.final_report()
[docs]
def final_report(self):
pass